前一天我們建立了一些變數,如lines、bsdLines等,有沒有感覺他們跟一般變數沒兩樣呢?在Spark中這些變數被稱為RDD(Resilient Distributed Datasets)。其實RDD就是我們常見的集合概念,比較特別的是實際資料集可以為橫跨數個結點所組成。
RDD有三個特性:
不可更動(Immutable)
:每個RDD都是不能被改變的(可以像Java的String一樣),想要更新的?從既有中再建立另一個吧。這樣的作法乍看下可能感覺怪怪的,但仔細想想,要讓資料容易用於分散式系統,Immutable是關鍵的一環,因為每個RDD都保證不會被更動。彈性(Resilient)
:分散式環境中忽然有節點失效是正常的(而且要平常心XD),那上面Spark正在使用或建立的RDD怎麼辦?沒關係,Spark會想辦法幫你重建
。這與之後會提到的RDD lineage概念有關。分散式(Distributed)
:資料集可跨多個節點,並儲存在每個節點的記憶體內,恩...所以Spark是記憶體怪獸XD,優點當然就是執行速度較快,但要小心網路資料交換(shuffling)這類昂貴操作。以先前的lines、bsdLines為例,因為每個RDD都是immutable,也就是說,只要紀錄了操作與建立行為
(有點類似DB的commit log),bsdLines RDD就可以從lines RDD取得,因此可以串出RDD間的關係,例如: line -> badLines -> OtherRDD1 -> OtherRDD2 -> ...。我們將其稱為RDD lineage(族譜)
。所以假設存放badLines RDD的節點損毀了(一或多台),但只要儲存line RDD的節點還在的話,是不是就能還原badLines了呢!當然底層通常會搭配一個分散式並且有副本(replication)特性的儲存系統,例如常見的Hadoop HDFS或S3等。
RDD的操作依性質主要分為兩類:
之前用的filter
就是一個transformation操作,而foreach
則是action操作。初學者剛開始可能會分不清楚哪些操作是transformation,哪些又是action?這時候只要記住action類不會產生新的RDD,產生新的RDD就是transformation即可,會比較容易分辨~
開始來玩一些操作吧,先看看常見的map、flatMap、count、distinct
[Snippet.3] RDD map操作
map的宣告格式類似以下
def map(f:(T) => U ):RDD[U]
map會帶入一個函式,從現在的RDD型別(T
),透過操作,產生出一個新RDD(型別為U)。輸入輸出的型別可以不同。例如我們可以將整數集合RDD內的每個元素取平方:
scala> val numbers=sc.parallelize(List(1,2,3,4,5)) ①
numbers: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> val numberSquared=numbers.map(num=>num*num) ②
numberSquared: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:26
scala> numberSquared.foreach(num=>print(num+" ")) ③
1 4 9 16 25
① 取得RDD有幾種方式,除了之前那種從外部資源(例如檔案)中取得外,其他常見的方式還有從一般集合中轉換,例如範例中透過sc.parallelize
函式將List集合物件轉換成RDD物件。parallelize可以接收Scala中任何實作Seq trait(trait類似Java介面)的集合物件,這邊不用了解那麼多,只要知道可以將集合轉成集合概念的RDD即可~
② 對numbers RDD執行map
操作,賦予一個num=>num*num
函式,每個numbers的元素都會被平方,而且map是轉換操作,所以會回傳一個numberSquared RDD。
③ 最後透過foreach(行動類操作)印出結果檢查
另外再玩點RDD集合的操作吧:
scala> numberSquared.first
res9: Int = 1
scala> numberSquared.top(2)
res8: Array[Int] = Array(25, 16)
看圖就知道哩,first
取出集合中的第一個元素,而top(N)
對整數來說就是取出最大的N個
如果重複輸出幾次numberSquared會發現:
scala> numberSquared.foreach(num=>print(num+" "))
1 4 9 16 25
scala> numberSquared.foreach(num=>print(num+" "))
1 16 25 9 4
疑,順序不同了耶?沒有保證List順序嗎?List不是有序的嗎?ListRDD應該也是吧?
的確,RDD內部還是有保證元素順序,但因為foreach這個函式操作是平行印出
才導致順序會變動。
可以使用一個collect()的action操作
,他會返還一個新的普通集合物件*
,常接於一串transformation操作的後面回傳最終的結果,例如這樣的格式:
val normalCollection = KindOfRDD.map(XXX)
.flatMap(YYY)
.filter(ZZZ)
.OtherTransformatioin1
.OtherTransformatioin2
.OtherTransformatioin3.
...
...
.collect()
scala> numberSquared.collect()
res15: Array[Int] = Array(1, 4, 9, 16, 25)
scala> numberSquared.collect()
res15: Array[Int] = Array(1, 4, 9, 16, 25)
scala> numberSquared.collect()
res15: Array[Int] = Array(1, 4, 9, 16, 25)
你看,不管執行幾次順序還是一樣的吧~
collect
函式請慎用,雖然它用來Demo一些結果很方便。但因為它是返還非RDD的普通集合物件
到單一節點
(driver執行所在節點),請想像龐大的分散式RDD在沒有經過謹慎處理的情況下,全部傳到單一節點上......痛過才懂XD~
Casting也是寫程式常見的功能,尤其是數字與字串間的轉換,這在RDD中也沒有問題,
最後講一個轉成字串,每個字串頭尾對調,並回傳給另外一個RDD的綜合操作吧:
scala> val castToReverseString=numberSquared.map(_.toString.reverse).collect
castToReverseString: Array[String] = Array(1, 4, 9, 61, 52)
numberSquared.map(\_.toString.reverse).collect
這個表達式做了幾件事情:
_
這個陌生的符號?這個是Scala中的placeholder
,可以當作"我不care要傳給我的變數叫啥名字,我只要知道有他就可以了",在案例中就是依序傳入1,4,9,16,25啦toString
轉成字串,再接reverse
函式。就會得到上述的結果